//  Copyright (c) 2020-present,  INSPUR Co, Ltd.  All rights reserved.
// This source code is licensed under Apache 2.0 License.

// liliupeng@inspur.com

#pragma once

#include <atomic>
#include <include/rocksdb/slice.h>
#include <include/rocksdb/types.h>
#include <db/dbformat.h>

#include "mvcc_key.h"
#include "rocksdb/types.h"
#include "util/coding.h"

namespace rocksdb {

class VersionNode {
public:
  enum struct NodeType {
    UNCODED,
    ENCODED
  };
  static constexpr size_t ENCODED_SEQUENCE_TYPE_LEN = 8;

protected:
  std::atomic<VersionNode *> prev_;
  std::atomic<VersionNode *> next_;
  NodeType nodeType_;

public:
  VersionNode(NodeType nodeType) : nodeType_(nodeType) {
    prev_.store(nullptr, std::memory_order_relaxed);
    next_.store(nullptr, std::memory_order_relaxed);
  }
  virtual ~VersionNode() {}

public:
  VersionNode * Next() {
    return next_.load(std::memory_order_acquire);
  }

  VersionNode * Prev() {
    return prev_.load(std::memory_order_acquire);
  }

  void SetNext(VersionNode *x) {
    next_.store(x, std::memory_order_release);
  }

  void SetPrev(VersionNode *x) {
    prev_.store(x, std::memory_order_release);
  }

  bool CASNext(VersionNode *expected, VersionNode *x) {
    return next_.compare_exchange_strong(expected, x);
  }

  bool CASPrev(VersionNode *expected, VersionNode *x) {
    return prev_.compare_exchange_strong(expected, x);
  }

  // No-barrier variants that can be safely used in a few locations.
  VersionNode * NoBarrier_Next() {
    return next_.load(std::memory_order_relaxed);
  }

  void NoBarrier_SetNext(int n, VersionNode *x) {
    next_.store(x, std::memory_order_relaxed);
  }

  NodeType GetNodeType() const {
    return nodeType_;
  }

public:
  virtual const char* Key() const = 0;

  virtual void GetMvccKey(MvccKey& mk, Slice& key) = 0;

  virtual void GetInternalKey(Slice& key, const char* buf) = 0;

  //key为internal_key
  virtual void GetValue(const Slice& key, Slice& value) = 0;

  virtual void GetSeqAndType(uint64_t seqNumAndType, uint64_t* seq, ValueType* t) = 0;

public:
  static void Encode(const Slice &key, const Slice &value,
                     SequenceNumber seq, ValueType type, char *buf) {
    /*
     *          5 bytes: encoded internal_key_size = key_size + 8
     *   key_size bytes: key
     *          8 bytes: encoded packed sequence number and value type
     *          5 bytes: encoded value_size
     * value_size bytes: value
     * 5 + key_size + 8 + 5 + value_size = encoded_len
     *
     * encoded_len
     * 1. the number of bytes that internal_key_size occupies
     * 2. internal_key_size = key_size + 8(encoded packed sequence number and value type)
     * 3. the number of bytes that value_size occupies
     * 4. value_size
     */
    size_t key_size = key.size();
    size_t val_size = value.size();
    size_t internal_key_size = key_size + ENCODED_SEQUENCE_TYPE_LEN;
    size_t encoded_len = VarintLength(internal_key_size) +
                         internal_key_size + VarintLength(val_size) +
                         val_size;
    char *p = EncodeVarint32(buf, internal_key_size);
    memcpy(p, key.data(), key_size);
    p += key_size;
    uint64_t packed = PackSequenceAndType(seq, type);
    EncodeFixed64(p, packed);
    p += ENCODED_SEQUENCE_TYPE_LEN;
    p = EncodeVarint32(p, val_size);
    memcpy(p, value.data(), val_size);
    assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
  }
  //key为internal_key
  static void Encode(const Slice &key, const Slice &value, char *buf) {
    size_t key_size = key.size();
    size_t val_size = value.size();
    size_t encoded_len = VarintLength(key_size) + key_size
                         + VarintLength(val_size) + val_size;
    char *p = EncodeVarint32(buf, key_size);
    memcpy(p, key.data(), key_size);
    p += key_size;
    p = EncodeVarint32(p, val_size);
    memcpy(p, value.data(), val_size);
    assert((unsigned)(p + val_size - buf) == (unsigned)encoded_len);
  }
};

}  // namespace rocksdb